feat(worker): ingest_callback contract on Backend (INT-386)#415
feat(worker): ingest_callback contract on Backend (INT-386)#415ldrozdz93 wants to merge 6 commits into
Conversation
Introduces IngestError / IngestUnavailable / IngestRejected to be raised by the worker-supplied ingest callback when pipeline writes fail. The hierarchy is the contract used by integrations to differentiate transient vs. permanent failures. Refs INT-386, INT-312 See: https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/worker-backend-ingest-callback.md Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends Backend.__init__ with keyword-only ingest_callback, policy, plus open **kwargs (forward-compat door). PolicyRunner.setup now builds a per-policy ingest callback closure and passes both to backend construction. The closure chunks+ingests entities (mirroring run()), tracks each off-schedule emission as a pseudo-run in RunStore, and translates response/transport errors into the new IngestError subclasses. run() is unchanged and remains single-shot per cron tick. Refs INT-386, INT-312 See: https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/worker-backend-ingest-callback.md https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/policy-distribution-refinement.md Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Coverage Report
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Pull request overview
This PR extends the worker backend interface to support “off-schedule” entity ingestion via a construction-time ingest_callback, and introduces a dedicated exception hierarchy for integrations to signal ingest failures.
Changes:
- Add
ingest_callbackandpolicykeyword-only parameters (plus forward-compatible**kwargs) toworker.backend.Backend.__init__. - Introduce
worker.exceptionswithIngestError,IngestUnavailable, andIngestRejectedfor callback-facing error signaling. - Update
PolicyRunner.setup()to build and pass an ingest-callback closure that chunks/ingests entities, records pseudo-runs inRunStore, and maps transport/response failures into the new exceptions.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| worker/worker/policy/runner.py | Builds and passes an ingest-callback closure during setup; adds ingest callback implementation and error mapping. |
| worker/worker/exceptions.py | Adds ingest-callback exception hierarchy for integrations. |
| worker/worker/backend.py | Extends Backend constructor to accept ingest_callback, policy, and absorb future kwargs. |
| worker/tests/test_exceptions.py | Adds unit tests for exception inheritance and message preservation. |
| worker/tests/test_backend.py | Adds unit tests for new backend constructor behavior and backward compatibility. |
| worker/tests/policy/test_runner.py | Adds tests verifying setup passes kwargs and callback behavior (happy path, error path, chunking, error translation). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Bind IngestError in `except ... as exc` instead of sys.exc_info() and drop the now-unused `import sys`. - Log unexpected exceptions via `logger.exception` before translating them to IngestUnavailable, so a worker-side programming bug isn't silently masked as a transient failure. - Annotate the closure signature (`error: Exception | None = None`, `-> None`) and rename `**kw` → `**kwargs` to match the docstring and Backend.__init__ contract. Comment the forward-compat door. - Guard against an integration calling the callback before the diode client is initialised (e.g. from inside `Backend.setup()`): record a FAILED pseudo-run and raise IngestUnavailable with a clear message instead of letting an AttributeError surface as an unexpected exception. - Extract `_send_entities` so `_build_ingest_callback` stays under the C901 complexity limit. - Add a unit test for the new guard. Refs INT-386, INT-312 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7e962a3264
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@leoparente @jajeffries sorry for the noise - I undrafted by mistake. Will let you know once the PR is proper quality |
…k helper Addresses 6 reviewer comments on PR #415: - Init-order race (Copilot/Codex/jajeffries): the closure dereferences `self.run_store`, `self.metadata`, `self._diode_client` — none of which are attached until after `backend.setup()` returns. Replaces the partial `_diode_client is None` guard with a single `self._callback_ready` flag, flipped True at the end of `setup()` and back to False at the start of `stop()`. Single source of truth for "callback is safe to invoke". - Pre-try work (Copilot): `list(entities)` and `apply_run_id_to_entities` now run INSIDE the try, so failures there reliably mark the pseudo-run FAILED instead of leaving it RUNNING. `entities_list = []` is pre-bound so `len()` is defined in the except handlers when `list(entities)` itself fails. - Chunk-threshold constant (Copilot): extracts `MAX_INGEST_MESSAGE_BYTES = 3 * 1024 * 1024` at module scope; replaces both float literals in `_send_entities` and `run()`. - Helper consistency (leoparente): `run()` now calls `_send_entities` instead of inlining the chunking branch. Helper returns the chunk count for the existing log line. Side effect: `run()` now raises `IngestRejected` (not `RuntimeError`) on response errors; outer `except Exception` catches it unchanged. Backend.__init__ docstring documents the "do not invoke from __init__/setup()" contract. Adds 4 new tests, renames one existing test to match the new guard: - test_ingest_callback_raises_when_not_ready (renamed) - test_ingest_callback_records_failure_on_apply_run_id_error - test_ingest_callback_records_failure_on_iterable_error - test_setup_sets_callback_ready_flag - test_stop_clears_callback_ready_flag ruff clean, pytest 114 passed. Refs INT-386, INT-312 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…spect on construction - ADR-0008: Drop policy= kwarg from Backend.__init__ (no consumer in v1). - ADR-0009: Add **kwargs forward-compat door to Backend.run. - ADR-0007: PolicyRunner uses inspect.signature to detect whether the backend class accepts ingest_callback. Legacy backends with __init__(self) continue to construct zero-arg with no coordinated upgrade. 49 tests pass including 3 new parametrized introspection cases. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Represents the contract bump shipped in this branch (ingest_callback keyword on Backend.__init__, **kwargs door on Backend.run, exceptions module, introspection in PolicyRunner.setup). Downstream consumers like nbl-infoblox-nios floor >=1.3.0, so the pre-bump 1.0.0 placeholder was causing pip resolver conflicts when both packages get installed as editable via INSTALL_DRIVERS_PATH. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Context
The
ingest_callbackprimitive added here is required for theAPI-triggered sync HTTP endpoint
in
controller-integrationsto work — that endpoint pushes entitiesoutside the cron
run()cycle and needs a callback to reach theworker's Diode pipeline.
Summary
Extend
worker.backend.Backendso that:__init__accepts aningest_callbackkeyword argument (optional,keyword-only, plus open
**kwargsforward-compat door).worker.exceptionsmodule exposesIngestError,IngestUnavailable,IngestRejected— raised by the ingestcallback on pipeline failures.
PolicyRunner.setupbuilds a per-policy ingest-callback closure andpasses it to the backend constructor. The closure chunks+ingests
entities mirroring
run(), records each off-schedule emission as apseudo-run in
RunStore, and translates response/transport errorsinto the new exception subclasses.
PolicyRunner.run()is unchanged and remains single-shot per cron tick.This unblocks API-triggered sync (INT-312, INT-386) and any future
integration pattern that needs to emit entities outside of the
single-shot
run()cycle.Design
Design doc:
worker-backend-ingest-callback.md
in
netboxlabs/controller-integrations.Compat
__init__orcustom
__init__with**kwargs) continue working unchangedthanks to
_construct_backend'sinspect.signatureintrospection.controller-integrationsdefine__init__(self) -> Noneand need a mechanical**kwargseditbefore this minor reaches production:
nbl-msft-dhcp,nbl-proxmox-ve. Follow-up ticket in that repo tracks the bump— does NOT block this PR (tests here mock the backend class).
Test plan
worker/tests/test_exceptions.py— hierarchy (IngestError/IngestUnavailable/IngestRejected) catchable as base; messagespreserved.
worker/tests/test_backend.py—ingest_callbackstored on theinstance; zero-arg construction still works; unknown kwargs absorbed.
worker/tests/policy/test_runner.py—setuppassesingest_callbackto backend; callback happy path;error=path;TypeErrorwhen both or neither supplied; transport errors →IngestUnavailable; response errors →IngestRejected; largepayloads chunked; the existing
run()path remains unaffected.Locally:
cd worker && pip install -e ".[test]" && pytest→ 109passed, 0 failures.
Judgement calls
_build_ingest_callbacklives onPolicyRunner(not a standalonehelper module) because the closure must capture
self.run_store,self.metadata, and the diodeclient. Splitting it out addsplumbing without testability benefit — tests extract the closure
via
mock_backend_class.call_args.kwargs["ingest_callback"]andexercise it directly.
self._diode_clientlazily so it can be built BEFOREthe diode client (which needs
metadatafrombackend.setup(),which needs the constructed backend). Solves the chicken-and-egg
between "construct backend with callback" and "callback needs
client".
response.errorsnon-empty →IngestRejected;any other transport exception →
IngestUnavailable. Coarse butuseful first cut; refinements remain additive.
run_idkwarg from the integration is NOT honoured in v1 — theclosure always creates a fresh pseudo-run. Documented as future
open-kwarg evolution in the design doc.
Refs INT-386, INT-312.